package com.jhf.youke.saga.app.consume;

import cn.hutool.json.JSONUtil;
import com.jhf.youke.core.ddd.DomainMq;
import com.jhf.youke.core.entity.SagaMsg;
import com.jhf.youke.core.entity.SendResult;
import com.jhf.youke.core.utils.CacheUtils;
import com.jhf.youke.core.utils.Constant;
import com.jhf.youke.core.utils.StringUtils;
import com.jhf.youke.saga.domain.model.Do.SagaDo;
import com.jhf.youke.saga.domain.model.Do.SagaTemplateListDo;
import com.jhf.youke.saga.domain.model.po.SagaMessagePo;
import com.jhf.youke.saga.domain.service.SagaMessageService;
import com.jhf.youke.saga.domain.service.SagaService;
import com.jhf.youke.saga.domain.service.SagaUnusualService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.util.List;
import java.util.Map;

/**
 * description:
 * date: 2022/9/23 14:12
 * @author: cyx
 */
@Slf4j
@Service
public class SagaConsumerService {

    @Resource
    private SagaService sagaService;

    @Resource
    private SagaMessageService sagaMessageService;

    @Resource
    private SagaUnusualService sagaUnusualService;

    @Resource
    private DomainMq domainMq;

    public SagaDo reply(Map<String, Object> map) {

        log.info("reply start {}", map);
        SagaDo next = sagaService.reply(map);
        log.info("is next :{} ",  next);
        // 如果下一个节点为空，则事务结束
        if(next != null) {
            String code = StringUtils.chgNull(map.get("code"));
            Long bizId = StringUtils.toLong(map.get("bizId"));
            Integer objectId = StringUtils.toInteger(map.get("objectId"));
            SagaMessagePo msg = sagaMessageService.getMessage(code, bizId, objectId);
            if (SagaDo.SAGA_CREATE.equals(next.getStatus()) || SagaDo.SEND_FAIL.equals(next.getStatus())) {

                log.info("reply send mq message {}", JSONUtil.toJsonStr(msg));
                SendResult result = domainMq.send(sagaService.sendMq(next, sagaService.getMsg(next)));
                // 发送成功，再更新saga状态
                if(Constant.RESPONSE_OK.equals(result.getCode())){
                    log.info("mq send ok, messageId {}", result.getData().getMsgId());
                    next.setMessageId(result.getData().getMsgId());
                    next.setStatus(SagaDo.SEND_OK);
                }else{
                    next.setStatus(SagaDo.SEND_FAIL);
                    log.info("mq send fail");
                }
                sagaService.update(next);
            }
        }
        log.info("saga reply end ");
        return next;
    }


    public List<SagaDo> create(Map<String, Object> map) {
        SagaMsg msg = new SagaMsg(map);
        List<SagaDo> sagaDoList = sagaService.getListByBiz(msg.getCode(),msg.getBizId(), msg.getObjectId());
        sagaMessageService.create(msg);
        SagaDo saga ;
        if(sagaDoList == null || sagaDoList.isEmpty()) {
            List<SagaTemplateListDo> list = sagaService.getTemplateList(msg);
            saga  =  sagaService.create(msg, list);
        }else {
            saga = sagaDoList.get(0);
        }

        //如果是新创建Saga或者是发送失败，则向MQ发送数据，幂等处理
        if( SagaDo.SAGA_CREATE.equals(saga.getStatus())  || SagaDo.SEND_FAIL.equals(saga.getStatus())){
            SendResult result = domainMq.send(sagaService.sendMq(saga, msg.getMessage()));
            // 发送成功，再更新saga状态
            if(Constant.RESPONSE_OK.equals(result.getCode())){
                log.info("mq messageId {}", result.getData().getMsgId());
                saga.setMessageId(result.getData().getMsgId());
                saga.setStatus(SagaDo.SEND_OK);
            }else{
                saga.setStatus(SagaDo.SEND_FAIL);
                //异常存入日志
                sagaUnusualService.saveBySaga(saga);
                //异常存入redis异常队列
                CacheUtils.setHashItem(sagaService.setHashItem(saga));
            }
            sagaService.updateByBiz(saga);
        }

        return null;
    }

}
